Skip to content

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Jan 11, 2026

  • removed AdminUtils (unused)

Summary by cubic

Switched Kafka client from confluent-kafka to aiokafka and moved schema registry to an async client. EventBus now filters self-published messages so handlers only process cross-instance events.

  • Refactors

    • Replaced Consumer/Producer with AIOKafkaConsumer/AIOKafkaProducer in core, EventBus, and DLQManager.
    • EventBus filters self-published messages; subscribers only handle events from other instances.
    • SchemaRegistryManager now uses AsyncSchemaRegistryClient and AsyncAvroMessageSerializer; serialize_event/deserialize_event are async.
    • ConsumerGroupMonitor uses AIOKafkaAdminClient; removed AdminUtils and its tests; added ConsumerGroupState enum.
    • Updated create_topics to AIOKafkaAdminClient; adjusted tests to await async schema operations; removed producer stats unit test.
  • Dependencies

    • Added aiokafka==0.13.0 and python-schema-registry-client==2.6.1.
    • Removed confluent-kafka and types-confluent-kafka.

Written for commit 01e9d8b. Summary will update on new commits.

Summary by CodeRabbit

  • New Features

    • Full async Kafka integration and EventBus with self-filtering; improved consumer-group monitoring with explicit enum states.
  • Bug Fixes

    • Async schema registry paths, more reliable DLQ/retry flows, and idempotency/settings cache handling that ignores self-published messages.
  • Documentation

    • New Event Bus architecture doc and updated user-settings/events guidance.
  • Chores

    • Kafka client stack and schema-registry client migrated; removal of legacy admin utilities.
  • Tests

    • Integration tests converted to async patterns; an obsolete admin test removed.

✏️ Tip: You can customize this high-level summary in your review settings.

+ removed AdminUtils (unused)
@coderabbitai
Copy link

coderabbitai bot commented Jan 11, 2026

Note

Currently processing new changes in this PR. This may take a few minutes, please wait...

📥 Commits

Reviewing files that changed from the base of the PR and between 74a5971 and 01e9d8b.

📒 Files selected for processing (1)
  • backend/app/services/event_bus.py
 ______________________________________________________
< GPU fans at max RPM: ready to blow away regressions. >
 ------------------------------------------------------
  \
   \   (\__/)
       (•ㅅ•)
       /   づ

Tip

You can disable sequence diagrams in the walkthrough.

Disable the reviews.sequence_diagrams setting in your project's settings in CodeRabbit to disable sequence diagrams in the walkthrough.

📝 Walkthrough

Walkthrough

This PR migrates the Kafka and schema-registry stacks from synchronous confluent-kafka to async aiokafka and an async schema registry client across DLQ, producers, consumers, EventBus, admin tooling, and tests, converting lifecycle and I/O to awaitable flows and adapting types/headers/offset handling.

Changes

Cohort / File(s) Summary
DLQ & Retry Manager
backend/app/dlq/manager.py
Replaced confluent Consumer/Producer with AIOKafkaConsumer/AIOKafkaProducer; message access moved to attributes; startup/shutdown and polling made async (start/stop, getone()); sends use send_and_wait; commits awaited; signatures/types updated.
Core Consumer & Producer
backend/app/events/core/consumer.py, backend/app/events/core/producer.py
Swapped to aiokafka types; consumer loop uses getone() with timeout; producer uses send_and_wait; removed delivery callbacks, threads, and flush semantics; headers/value access and tracing adapted for aiokafka.
Schema Registry (async)
backend/app/events/schema/schema_registry.py, backend/app/core/providers.py
Replaced sync registry with async client/serializer (AsyncSchemaRegistryClient/AsyncAvroMessageSerializer); made register/serialize/deserialize async; factory function removed in favor of direct SchemaRegistryManager instantiation.
EventBus & Services
backend/app/services/event_bus.py, backend/app/services/user_settings_service.py
EventBus now uses AIOKafkaProducer/AIOKafkaConsumer; publish uses send_and_wait with source_instance header; listener filters self-published messages and uses async getone(); lifecycle and startup/shutdown are async.
Consumer Group Monitoring
backend/app/events/consumer_group_monitor.py
Reworked to use AIOKafkaAdminClient/AIOKafkaConsumer; added ConsumerGroupState enum, DescribedGroup, parsing helpers, async admin lifecycle, and updated lag/health computation and return types.
Admin Utilities & Topic Creation
backend/app/events/admin_utils.py (deleted), backend/scripts/create_topics.py
Deleted AdminUtils helper; create_topics.py migrated to AIOKafkaAdminClient with async start/close, awaited list/create, updated NewTopic args, and TopicAlreadyExistsError handling.
Idempotency Middleware
backend/app/services/idempotency/middleware.py
Adjusted to access message.value and await async deserialize_event; topic read as property.
Tests (integration & unit)
backend/tests/integration/*.py, backend/tests/unit/events/core/test_producer.py
Many tests converted to async (pytest.mark.asyncio) and updated to await schema/producer operations; removed admin_utils integration test and one unit test.
Dependencies & Config
backend/pyproject.toml, backend/app/settings.py
Removed confluent-kafka, added aiokafka and python-schema-registry-client; SCHEMA_REGISTRY_AUTH default changed from None to "" with validation.
Docs & Navigation
docs/architecture/event-bus.md, docs/architecture/user-settings-events.md, mkdocs.yml, docs/components/schema-manager.md
Added EventBus architecture doc, updated user-settings docs and schema-manager docs to reflect async registry and EventBus behavior; added nav entry.

Sequence Diagram(s)

sequenceDiagram
  participant Publisher
  participant SchemaReg as SchemaRegistry
  participant Producer as AIOKafkaProducer
  participant Kafka as KafkaTopic
  participant Consumer as AIOKafkaConsumer
  participant Subscriber

  Publisher->>SchemaReg: serialize_event(event) (await)
  SchemaReg-->>Publisher: bytes
  Publisher->>Producer: send_and_wait(topic, bytes, headers) (await)
  Producer-->>Kafka: append message
  Kafka-->>Consumer: deliver message
  Consumer->>Subscriber: deserialize_event(bytes) (await)
  Subscriber-->>Consumer: acknowledge / process result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

enhancement

Poem

🐰 I hopped from sync to async with a twitch of my nose,

Producers await, consumers getone as the Kafka river flows,
Headers in tuples, offsets set as fields, neat and small,
Schemas register async — no blocking at all,
A rabbit cheers the new async sprawl!

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 58.82% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: confluence kafka -> aiokafka' clearly summarizes the main change: replacing Confluent Kafka with aiokafka. It is concise, specific, and directly reflects the primary objective of the changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov-commenter
Copy link

codecov-commenter commented Jan 12, 2026

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

❌ Patch coverage is 73.84106% with 79 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
backend/app/events/consumer_group_monitor.py 63.26% 54 Missing ⚠️
backend/app/events/core/producer.py 75.00% 7 Missing ⚠️
backend/app/events/core/consumer.py 83.33% 6 Missing ⚠️
backend/app/dlq/manager.py 82.75% 5 Missing ⚠️
backend/app/services/event_bus.py 89.28% 3 Missing ⚠️
backend/app/events/schema/schema_registry.py 93.10% 2 Missing ⚠️
backend/app/services/idempotency/middleware.py 0.00% 2 Missing ⚠️
❗ Your organization needs to install the Codecov GitHub app to enable full functionality.
Flag Coverage Δ
backend-e2e 58.79% <25.16%> (+0.05%) ⬆️
backend-integration 74.80% <73.84%> (+0.04%) ⬆️
backend-unit 61.30% <11.25%> (+0.15%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
backend/app/core/providers.py 95.11% <100.00%> (ø)
backend/app/services/user_settings_service.py 88.23% <ø> (-1.69%) ⬇️
backend/app/settings.py 100.00% <100.00%> (ø)
backend/app/events/schema/schema_registry.py 92.10% <93.10%> (+0.07%) ⬆️
backend/app/services/idempotency/middleware.py 69.72% <0.00%> (ø)
backend/app/services/event_bus.py 80.21% <89.28%> (+1.30%) ⬆️
backend/app/dlq/manager.py 70.12% <82.75%> (-1.60%) ⬇️
backend/app/events/core/consumer.py 83.67% <83.33%> (+3.29%) ⬆️
backend/app/events/core/producer.py 82.10% <75.00%> (-3.30%) ⬇️
backend/app/events/consumer_group_monitor.py 67.67% <63.26%> (+7.87%) ⬆️

... and 5 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7 issues found across 19 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/events/consumer_group_monitor.py">

<violation number="1" location="backend/app/events/consumer_group_monitor.py:115">
P2: Silent exception swallowing hides parsing errors. Consider logging the exception at debug/warning level before returning an empty list to aid debugging when member assignment parsing fails unexpectedly.</violation>
</file>

<file name="backend/app/events/core/producer.py">

<violation number="1" location="backend/app/events/core/producer.py:33">
P2: The `stats_callback` parameter is now dead code. It's accepted and stored but never invoked since the `_handle_stats` method was removed. Either remove this parameter or implement statistics collection for aiokafka.</violation>
</file>

<file name="backend/app/services/idempotency/middleware.py">

<violation number="1" location="backend/app/services/idempotency/middleware.py:238">
P0: Change `message.topic()` to `message.topic` - in aiokafka's ConsumerRecord, `topic` is a property, not a method. The parentheses will cause a TypeError at runtime.</violation>
</file>

<file name="backend/scripts/create_topics.py">

<violation number="1" location="backend/scripts/create_topics.py:79">
P2: Error handling is less granular than the original implementation. If TopicAlreadyExistsError is raised, it's unclear which topics (if any) were successfully created. Consider querying topic existence after the error to determine actual creation status, or catching the error per-topic if aiokafka supports it.</violation>
</file>

<file name="backend/app/services/event_bus.py">

<violation number="1" location="backend/app/services/event_bus.py:135">
P2: Behavioral change: `send_and_wait()` blocks until Kafka acknowledges the message, unlike the previous fire-and-forget pattern with `produce()` + `poll(0)`. This makes publishing synchronous and potentially slower. Consider using `send()` without waiting if fire-and-forget behavior is desired, or document this intentional change for increased reliability.</violation>
</file>

<file name="backend/app/events/schema/schema_registry.py">

<violation number="1" location="backend/app/events/schema/schema_registry.py:76">
P0: Missing Schema Registry authentication configuration. The old code passed `settings.SCHEMA_REGISTRY_AUTH` to the client, but the new `AsyncSchemaRegistryClient` initialization doesn't include authentication credentials. This will cause authentication failures in production environments that require credentials to access the Schema Registry.</violation>
</file>

<file name="backend/app/dlq/manager.py">

<violation number="1" location="backend/app/dlq/manager.py:524">
P1: Missing `enable_idempotence=True` parameter. The original confluent_kafka producer had idempotence enabled for exactly-once delivery semantics. This should be preserved in the aiokafka migration to prevent duplicate messages during retries.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
backend/app/services/user_settings_service.py (2)

187-209: Missing cross-instance cache invalidation for restore_settings_to_point.

This method updates the local cache and publishes to event_service, but does not publish to the event bus for cross-instance cache invalidation (unlike update_user_settings at line 112-114). Other instances will retain stale cached data after a restore operation.

🐛 Proposed fix: Add event bus publish after cache update
         await self.repository.create_snapshot(settings)
         self._add_to_cache(user_id, settings)
 
+        if self._event_bus_manager is not None:
+            bus = await self._event_bus_manager.get_event_bus()
+            await bus.publish("user.settings.updated", {"user_id": user_id, "source_instance": self._instance_id})
+
         await self.event_service.publish_event(
             event_type=EventType.USER_SETTINGS_UPDATED,

245-249: Missing cross-instance cache invalidation for reset_user_settings.

Similar to restore_settings_to_point, this method modifies settings state but does not notify other instances via the event bus. After a reset, other instances will continue serving stale cached settings.

🐛 Proposed fix: Add event bus publish after cache invalidation
     async def reset_user_settings(self, user_id: str) -> None:
         """Reset user settings by deleting all data and cache."""
         await self.invalidate_cache(user_id)
         await self.repository.delete_user_settings(user_id)
+        if self._event_bus_manager is not None:
+            bus = await self._event_bus_manager.get_event_bus()
+            await bus.publish("user.settings.updated", {"user_id": user_id, "source_instance": self._instance_id})
         self.logger.info(f"Reset settings for user {user_id}")
backend/app/services/idempotency/middleware.py (1)

209-209: Stale comment: references confluent-kafka instead of aiokafka.

The comment still references "confluent-kafka Message" but the PR migrates to aiokafka.

Suggested fix
-            # Extract event from confluent-kafka Message
+            # Extract event from Kafka Message
🤖 Fix all issues with AI agents
In @backend/app/events/consumer_group_monitor.py:
- Around line 345-363: The call to admin.describe_consumer_groups in
_describe_consumer_group will fail because aiokafka v0.13.0 does not implement
that method; either update the aiokafka dependency in pyproject.toml to a
version that exposes describe_consumer_groups, or replace the call with a
supported flow: use admin.list_consumer_groups to locate the group (or
admin.find_coordinator to get the broker for the group) and then query group
details via the supported APIs (list_consumer_groups/find_coordinator + the
existing response-parsing logic used by _parse_describe_groups_response) to
produce a DescribedGroup; update the code in _describe_consumer_group to use
list_consumer_groups/find_coordinator instead of describe_consumer_groups, or
bump the aiokafka version accordingly.

In @backend/scripts/create_topics.py:
- Around line 10-11: The import of NewTopic is wrong: aiokafka.admin doesn't
export NewTopic so replace the import to use NewTopic from kafka.admin and add
kafka-python to project dependencies (pyproject.toml); update the NewTopic
construction call where NewTopic(...) is used (see usage around the
AIOKafkaAdminClient code) to pass config= instead of topic_configs= so it
matches kafka-python's NewTopic API; ensure any references to topic_configs are
renamed to config and that NewTopic is imported as: from kafka.admin import
NewTopic while leaving AIOKafkaAdminClient and TopicAlreadyExistsError imports
as-is.
🧹 Nitpick comments (4)
backend/tests/integration/dlq/test_dlq_manager.py (1)

46-55: Use test_settings.KAFKA_BOOTSTRAP_SERVERS instead of hardcoded value.

The producer uses a hardcoded "localhost:9092" while the rest of the test infrastructure relies on test_settings. This could cause test failures in environments where Kafka runs on a different host/port.

♻️ Proposed fix
-    producer = AIOKafkaProducer(bootstrap_servers="localhost:9092")
+    producer = AIOKafkaProducer(bootstrap_servers=test_settings.KAFKA_BOOTSTRAP_SERVERS)
backend/app/events/core/producer.py (1)

67-74: Add missing request_timeout_ms and enable_idempotence configuration to AIOKafkaProducer initialization.

The ProducerConfig defines request_timeout_ms and enable_idempotence settings (lines 43-45 in types.py), but these are not passed to the AIOKafkaProducer. Both parameters are supported by aiokafka and should be applied.

Note: The retries field from ProducerConfig cannot be directly used with aiokafka—it automatically retries failed requests until request_timeout_ms expires. The max_in_flight_requests_per_connection setting is also not supported by aiokafka, which enforces one batch per partition at a time.

♻️ Add timeout and idempotence configuration
 self._producer = AIOKafkaProducer(
     bootstrap_servers=self._config.bootstrap_servers,
     client_id=self._config.client_id,
     acks=self._config.acks,
     compression_type=self._config.compression_type,
     max_batch_size=self._config.batch_size,
     linger_ms=self._config.linger_ms,
+    request_timeout_ms=self._config.request_timeout_ms,
+    enable_idempotence=self._config.enable_idempotence,
 )
backend/app/events/consumer_group_monitor.py (1)

107-116: Silent exception swallowing may hide parsing bugs.

The bare except Exception returns an empty list without logging. Consider logging at debug level to aid troubleshooting malformed assignment data.

Suggested improvement
     try:
         assignment = MemberAssignment.decode(assignment_bytes)
         return [(topic, list(partitions)) for topic, partitions in assignment.assignment]
-    except Exception:
+    except Exception as e:
+        # Log at debug level - malformed assignments may occur during rebalancing
+        import logging
+        logging.getLogger(__name__).debug(f"Failed to parse member assignment: {e}")
         return []
backend/scripts/create_topics.py (1)

23-30: Consider adding a connection timeout.

The start() call could hang indefinitely if Kafka brokers are unreachable. For a setup script, you might want to add a timeout to fail fast.

💡 Suggested improvement
     try:
-        await admin_client.start()
+        await asyncio.wait_for(admin_client.start(), timeout=30.0)
         logger.info(f"Connected to Kafka brokers: {settings.KAFKA_BOOTSTRAP_SERVERS}")

You'd also need to catch asyncio.TimeoutError and log an appropriate error message.

📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 48918af and 92fd5ad.

⛔ Files ignored due to path filters (1)
  • backend/uv.lock is excluded by !**/*.lock
📒 Files selected for processing (18)
  • backend/app/dlq/manager.py
  • backend/app/events/admin_utils.py
  • backend/app/events/consumer_group_monitor.py
  • backend/app/events/core/consumer.py
  • backend/app/events/core/producer.py
  • backend/app/events/schema/schema_registry.py
  • backend/app/services/event_bus.py
  • backend/app/services/idempotency/middleware.py
  • backend/app/services/user_settings_service.py
  • backend/pyproject.toml
  • backend/scripts/create_topics.py
  • backend/tests/integration/dlq/test_dlq_manager.py
  • backend/tests/integration/events/test_admin_utils.py
  • backend/tests/integration/events/test_consumer_group_monitor_real.py
  • backend/tests/integration/events/test_schema_registry_real.py
  • backend/tests/integration/events/test_schema_registry_roundtrip.py
  • backend/tests/integration/services/coordinator/test_execution_coordinator.py
  • backend/tests/unit/events/core/test_producer.py
💤 Files with no reviewable changes (3)
  • backend/tests/integration/events/test_admin_utils.py
  • backend/tests/unit/events/core/test_producer.py
  • backend/app/events/admin_utils.py
🧰 Additional context used
🧬 Code graph analysis (10)
backend/app/services/user_settings_service.py (1)
backend/app/services/event_bus.py (4)
  • get_event_bus (313-319)
  • get_event_bus (329-331)
  • EventBusEvent (21-29)
  • publish (118-144)
backend/tests/integration/events/test_schema_registry_roundtrip.py (1)
backend/app/events/schema/schema_registry.py (3)
  • serialize_event (132-153)
  • deserialize_event (155-182)
  • initialize_schemas (222-233)
backend/app/services/idempotency/middleware.py (1)
backend/app/events/schema/schema_registry.py (1)
  • deserialize_event (155-182)
backend/tests/integration/dlq/test_dlq_manager.py (3)
backend/app/events/core/producer.py (1)
  • producer (59-60)
backend/tests/helpers/kafka.py (1)
  • producer (10-13)
backend/app/domain/enums/kafka.py (1)
  • KafkaTopic (7-53)
backend/tests/integration/services/coordinator/test_execution_coordinator.py (1)
backend/tests/helpers/eventually.py (1)
  • eventually (8-36)
backend/app/dlq/manager.py (3)
backend/app/events/core/producer.py (2)
  • producer (59-60)
  • _on_start (62-78)
backend/app/dlq/models.py (1)
  • DLQMessage (30-55)
backend/app/domain/enums/kafka.py (1)
  • GroupId (56-68)
backend/app/events/schema/schema_registry.py (2)
backend/app/services/kafka_event_service.py (1)
  • close (296-298)
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (1)
  • close (79-80)
backend/app/services/event_bus.py (3)
backend/app/events/core/producer.py (1)
  • producer (59-60)
backend/app/events/core/consumer.py (3)
  • consumer (211-212)
  • start (47-72)
  • stop (74-89)
backend/app/services/idempotency/middleware.py (2)
  • start (263-271)
  • stop (273-275)
backend/app/events/core/producer.py (5)
backend/app/events/core/types.py (1)
  • ProducerConfig (31-62)
backend/app/events/schema/schema_registry.py (1)
  • serialize_event (132-153)
backend/app/events/core/consumer.py (1)
  • start (47-72)
backend/app/core/metrics/events.py (1)
  • record_kafka_message_produced (179-182)
backend/app/domain/enums/kafka.py (1)
  • KafkaTopic (7-53)
backend/app/events/core/consumer.py (1)
backend/app/events/schema/schema_registry.py (1)
  • deserialize_event (155-182)
🪛 GitHub Actions: MyPy Type Checking
backend/app/events/schema/schema_registry.py

[error] 7-7: mypy error: Module 'schema_registry.serializers' does not explicitly export attribute 'AsyncAvroMessageSerializer'.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Scan Backend
  • GitHub Check: cubic · AI code reviewer
  • GitHub Check: E2E Tests
  • GitHub Check: Integration Tests
🔇 Additional comments (33)
backend/tests/integration/services/coordinator/test_execution_coordinator.py (1)

17-21: LGTM!

The refactored test correctly uses the eventually helper to poll for the async background scheduling behavior. The timeout of 2.0s with a 0.05s interval provides adequate retries (up to 40) for the coordinator's background loop to mark the execution as active. This approach properly aligns with the async architecture migration.

backend/app/services/user_settings_service.py (3)

42-42: LGTM!

Using uuid4() for instance identification is a solid approach for cross-instance event deduplication. The ID is generated once at initialization, ensuring consistent identity throughout the service's lifetime.


63-70: LGTM!

The cross-instance guard correctly filters self-originated events to prevent redundant cache invalidation. Using .get() gracefully handles events without source_instance, allowing backward compatibility with events that don't include this field.


112-114: LGTM!

Including source_instance in the event payload correctly enables the cross-instance guard mechanism. The event is published after persistence operations, ensuring consistency.

backend/app/events/core/producer.py (1)

128-161: Good migration to async produce pattern.

The error handling properly catches KafkaError, updates metrics, and re-raises the exception. The send_and_wait usage ensures the message is acknowledged before returning.

backend/app/events/core/consumer.py (2)

106-127: Consume loop implementation looks correct.

The pattern of using asyncio.wait_for with getone() and handling TimeoutError to continue polling is appropriate for aiokafka. The small sleep on timeout prevents busy-waiting.


257-258: seek() is synchronous in aiokafka—no await needed.

In aiokafka, seek() is a synchronous method that sets the fetch offset for the next getone()/getmany() call, which is correct here. Note that seek_to_end() and similar methods are async and require await, but seek() does not.

backend/app/services/event_bus.py (2)

82-90: Unique group_id per consumer instance enables fan-out semantics.

Using uuid4() in the group_id ensures each EventBus instance receives all messages (fan-out), which is appropriate for a local event distribution pattern. This is a deliberate design choice.


262-279: Kafka listener correctly handles async message consumption.

The pattern properly catches asyncio.TimeoutError for no-message conditions and KafkaError for consumer issues. Message deserialization and distribution are handled in a try block with appropriate error logging.

backend/app/dlq/manager.py (3)

150-160: Start order: producer before consumer is correct for DLQ processing.

Starting the producer before the consumer ensures retry/forward operations are ready before message consumption begins. The _on_stop correctly stops consumer first (line 174) before producer (line 175).


197-205: Poll timeout of 1.0s is reasonable for DLQ processing.

The longer timeout (1.0s vs 0.1s in other consumers) is appropriate for the DLQ manager which doesn't require real-time responsiveness. The KafkaError handling logs and returns None to continue the loop.


327-341: The dual-send pattern is intentional for separating monitoring from reprocessing; the retry topic is not consumed by any service.

The code correctly sends the event to both the retry topic (for monitoring/observability) and the original topic (for reprocessing). However, the concern about downstream consumers handling duplicates is not applicable—there are no services in the codebase that subscribe to retry topics. The architecture intentionally separates the monitoring path (retry topic) from the actual reprocessing path (original topic), making the dual-send design sound as-is.

Likely an incorrect or invalid review comment.

backend/app/events/schema/schema_registry.py (3)

73-83: Lazy initialization pattern is appropriate for async clients.

The _get_client() and _get_serializer() methods properly handle lazy initialization. This avoids issues with creating async resources before the event loop is running.


235-238: No explicit cleanup needed for AsyncSchemaRegistryClient.

AsyncSchemaRegistryClient from the python-schema-registry-client library is a stateless HTTP client with no close() or aclose() method. The current implementation of clearing references is sufficient.


6-7: No issues found. The import of AsyncAvroMessageSerializer from schema_registry.serializers is valid and functional. Tests that depend on this import execute successfully, and the MyPy configuration explicitly disables import-not-found errors. There is no evidence of a pipeline failure.

backend/pyproject.toml (1)

26-27: The dependency versions are appropriate. python-schema-registry-client==2.6.1 includes AsyncAvroMessageSerializer and async support, and aiokafka==0.13.0 is a stable release. Both libraries support the async functionality required for this migration.

backend/app/services/idempotency/middleware.py (1)

238-238: LGTM!

The await is correctly added to match the now-async deserialize_event method signature. The surrounding async context properly supports this change.

backend/tests/integration/events/test_schema_registry_real.py (1)

14-27: LGTM!

The test correctly migrates to async pattern with @pytest.mark.asyncio decorator and properly awaits the now-async serialize_event and deserialize_event methods. The test logic remains unchanged and valid.

backend/tests/integration/events/test_schema_registry_roundtrip.py (2)

20-22: LGTM!

The await calls are correctly added to match the async method signatures.


29-33: LGTM!

The test is correctly converted to async with proper decorator and await usage for the invalid header test case.

backend/tests/integration/events/test_consumer_group_monitor_real.py (4)

5-10: LGTM!

The import of ConsumerGroupState is correctly added to support enum-based state assertions throughout the test file.


23-29: LGTM!

The test correctly adapts to the new API: removed timeout parameter and uses enum values for state comparison. The health summary assertion properly uses .value for string comparison.


34-74: LGTM!

Comprehensive coverage of _assess_group_health branches using the new ConsumerGroupState enum values. All state transitions (UNKNOWN, DEAD, STABLE, PREPARING_REBALANCE, COMPLETING_REBALANCE, EMPTY) are properly tested.


100-106: LGTM!

The test correctly uses the updated get_multiple_group_status signature without the timeout parameter.

backend/app/events/consumer_group_monitor.py (6)

26-36: LGTM!

The ConsumerGroupState enum properly models Kafka protocol states with appropriate string values matching the Kafka wire format.


160-174: LGTM!

Lazy initialization pattern for the admin client is appropriate. The close() method properly cleans up the resource.

Consider implementing __aenter__/__aexit__ for async context manager support if this class will be used in async with patterns.


87-104: LGTM!

The _parse_describe_groups_response function correctly extracts group metadata from the raw response object. The typed DescribedGroup dataclass improves code clarity.


413-456: LGTM!

The _assess_group_health method provides comprehensive health evaluation covering all ConsumerGroupState values, lag thresholds, and partition distribution. The logic is sound and aligns with the test coverage.


232-238: Lag is only calculated for STABLE groups.

This is intentional behavior, but note that groups in EMPTY state will report total_lag=0 even if committed offsets exist and lag is accumulating. Consider whether lag monitoring should extend to EMPTY groups for alerting purposes.


376-407: Temporary consumer creation is necessary, but verify if lag monitoring is called frequently in production.

The current implementation uses AIOKafkaConsumer.end_offsets() to retrieve high watermarks. AIOKafkaAdminClient does not expose this capability, so a temporary consumer is required. If _get_consumer_group_lag() is called frequently during monitoring, consider optimizing the frequency of lag checks rather than the consumer lifecycle, or batch lag calculations across multiple consumer groups.

Likely an incorrect or invalid review comment.

backend/scripts/create_topics.py (3)

33-72: Good filtering and construction logic.

Converting to a set for O(1) lookups and filtering existing topics before creation is good practice. This reduces unnecessary API calls and handles the common case efficiently.


74-83: Appropriate race condition handling.

Catching TopicAlreadyExistsError handles the edge case where another process creates the topic between the existence check and creation attempt.

Minor note: if this exception is raised, the per-topic success logs (lines 77-78) are skipped. This is acceptable for a setup script but means you won't know exactly which topics were created vs. already existed in the race condition case.


94-112: Proper async cleanup and entry point.

The finally block ensures the admin client is always closed, and asyncio.run() is the correct way to run the async entry point. Error handling with sys.exit(1) on failure is appropriate for a setup script.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 5 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/event_bus.py">

<violation number="1" location="backend/app/services/event_bus.py:273">
P1: Header decoding can crash the Kafka listener if a malformed message is received. Move the header processing inside the try-catch block to prevent a single bad message from terminating the entire listener.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/services/event_bus.py (1)

98-122: Potential resource leak if producer.stop() fails before consumer cleanup.

If self.consumer.stop() raises an exception, self.producer.stop() will not be called. Consider using try/finally or stopping both independently.

Proposed fix to ensure both resources are cleaned up
     async def _on_stop(self) -> None:
         """Stop the event bus and clean up resources."""
         # Cancel consumer task
         if self._consumer_task and not self._consumer_task.done():
             self._consumer_task.cancel()
             try:
                 await self._consumer_task
             except asyncio.CancelledError:
                 pass
 
         # Stop Kafka components
-        if self.consumer:
-            await self.consumer.stop()
-            self.consumer = None
-
-        if self.producer:
-            await self.producer.stop()
-            self.producer = None
+        try:
+            if self.consumer:
+                await self.consumer.stop()
+        except Exception as e:
+            self.logger.error(f"Error stopping consumer: {e}")
+        finally:
+            self.consumer = None
+
+        try:
+            if self.producer:
+                await self.producer.stop()
+        except Exception as e:
+            self.logger.error(f"Error stopping producer: {e}")
+        finally:
+            self.producer = None
 
         # Clear subscriptions
         async with self._lock:
🤖 Fix all issues with AI agents
In @backend/app/services/event_bus.py:
- Around line 259-288: The _kafka_listener loop currently reads self.is_running
without synchronization and assumes msg.value is non-None; fix by using a
dedicated asyncio.Event (e.g., self._stop_event) that _on_stop sets so the loop
waits/checks atomically, and guard against tombstone records by checking if
msg.value is None before decoding (skip or log and continue). Concretely,
replace the raw self.is_running loop condition in _kafka_listener with a pattern
that uses self._stop_event.is_set() or awaits it with timeout to decide
continuation, ensure _on_stop sets that event, and add an explicit if msg.value
is None: log/continue before calling msg.value.decode() and json.loads().
🧹 Nitpick comments (1)
docs/architecture/event-bus.md (1)

86-104: Minor: Usage example accesses _event_bus_manager which isn't shown being set.

The example shows self._event_bus_manager.get_event_bus() at line 102 but the service only receives event_bus_manager in initialize(). Consider showing the assignment or using the passed parameter for clarity.

Suggested clarification
 class MyService:
+    def __init__(self) -> None:
+        self._event_bus_manager: EventBusManager | None = None
+
     async def initialize(self, event_bus_manager: EventBusManager) -> None:
+        self._event_bus_manager = event_bus_manager
         bus = await event_bus_manager.get_event_bus()
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 92fd5ad and 1e4c16e.

📒 Files selected for processing (5)
  • backend/app/services/event_bus.py
  • backend/app/services/user_settings_service.py
  • docs/architecture/event-bus.md
  • docs/architecture/user-settings-events.md
  • mkdocs.yml
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/app/services/user_settings_service.py
🧰 Additional context used
🧬 Code graph analysis (1)
backend/app/services/event_bus.py (1)
backend/app/domain/enums/kafka.py (1)
  • KafkaTopic (7-53)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: cubic · AI code reviewer
  • GitHub Check: E2E Tests
  • GitHub Check: Integration Tests
🔇 Additional comments (11)
backend/app/services/event_bus.py (4)

10-11: LGTM!

Imports correctly updated from confluent-kafka to aiokafka with the appropriate error class.


137-150: LGTM!

The publish flow correctly includes the source_instance header for self-filtering and uses send_and_wait for reliable delivery with proper error handling.


268-275: LGTM on self-filtering implementation.

The self-filtering logic correctly extracts the source_instance header and skips messages from the same instance. The defensive handling of missing headers (b"" default) is appropriate.


79-96: Design is intentional — unique group_id enables broadcast pattern, but verify group cleanup policy.

Each EventBus instance uses a unique consumer group (event-bus-{uuid4()}) by design, allowing all instances to receive all messages from the topic. The class docstring confirms this is the intended cross-instance event distribution pattern: "Subscribers receive events from OTHER instances only." Each instance filters self-published messages using a unique _instance_id.

However, orphaned consumer groups will accumulate over time when instances restart. If your Kafka broker's group cleanup policy (offsets.retention.minutes) is not explicitly configured, expired groups may persist indefinitely. Confirm your broker-side cleanup settings handle this, or implement explicit group deletion on instance shutdown using AdminClient.

mkdocs.yml (1)

122-122: LGTM!

Navigation entry correctly added in logical order within the Architecture section.

docs/architecture/event-bus.md (3)

1-6: LGTM!

Clear problem statement that explains the need for cross-instance communication in horizontally scaled deployments.


55-81: LGTM!

Excellent documentation of the self-filtering mechanism. The code snippets accurately reflect the implementation in event_bus.py, and the three-point summary clearly explains the design contract.


158-162: LGTM!

Key files table correctly links to the implementation and example usage files.

docs/architecture/user-settings-events.md (3)

69-77: LGTM!

Documentation accurately describes the updated EventBus behavior where handlers only run for events from other instances, and local cache is updated directly before publishing.


92-100: LGTM!

Key files table and related docs section correctly updated to include the new EventBus documentation and cross-references.


23-40: Line numbers in documentation are accurate; no update needed.

The referenced line ranges in user_settings_service.py are correct:

  • Lines 22-23 contain the TypeAdapter definitions
  • Lines 91-118 contain the complete update_user_settings method
  • Lines 212-223 contain the complete _apply_event method

The --8<-- directives in the documentation are not stale and reference valid code sections.

Likely an incorrect or invalid review comment.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In @backend/tests/integration/dlq/test_dlq_manager.py:
- Around line 45-55: Replace the hardcoded bootstrap server in the
AIOKafkaProducer instantiation with the test settings constant so the test uses
the same Kafka address as the DLQ manager: change
AIOKafkaProducer(bootstrap_servers="localhost:9092") to use
test_settings.KAFKA_BOOTSTRAP_SERVERS (keep the existing producer variable name
and surrounding send/stop logic, and do not modify topic/key/value code using
prefix, KafkaTopic.DEAD_LETTER_QUEUE, ev.event_id, or payload).

In @docs/components/schema-manager.md:
- Line 23: Update the docs to state that schema registry operations are
asynchronous: explicitly mention that SchemaRegistryManager performs async
operations and that serialize_event and deserialize_event are async methods that
must be awaited, and note that initialize_schemas() is an async method as well;
update the sentences around the existing mentions (including the line describing
serialization mechanics and the lines at ~19 and ~29) to include this
async/await requirement and a short example phrase like "async — must be
awaited" for clarity.
🧹 Nitpick comments (1)
backend/app/events/schema/schema_registry.py (1)

56-68: Consider validating auth format for edge cases.

The auth parsing logic works correctly for the common cases (empty string, valid username:password). However, malformed inputs like "user:" or ":pass" would produce tuples with empty elements that might cause unexpected behavior with the schema registry client.

Optional: Add validation for auth format
         parts = settings.SCHEMA_REGISTRY_AUTH.split(":", 1)
-        auth: tuple[str, str] | None = (parts[0], parts[1]) if len(parts) == 2 else None
+        auth: tuple[str, str] | None = None
+        if len(parts) == 2 and parts[0] and parts[1]:
+            auth = (parts[0], parts[1])
         self._client = AsyncSchemaRegistryClient(url=settings.SCHEMA_REGISTRY_URL, auth=auth)  # type: ignore[arg-type]
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1e4c16e and 3c82f9d.

📒 Files selected for processing (7)
  • backend/app/core/providers.py
  • backend/app/events/schema/schema_registry.py
  • backend/app/services/idempotency/middleware.py
  • backend/app/settings.py
  • backend/tests/integration/dlq/test_dlq_manager.py
  • docs/components/schema-manager.md
  • docs/reference/environment-variables.md
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/app/services/idempotency/middleware.py
🧰 Additional context used
🧬 Code graph analysis (2)
backend/app/core/providers.py (1)
backend/app/events/schema/schema_registry.py (1)
  • SchemaRegistryManager (50-197)
backend/tests/integration/dlq/test_dlq_manager.py (3)
backend/app/dlq/manager.py (1)
  • create_dlq_manager (507-543)
backend/app/events/schema/schema_registry.py (1)
  • SchemaRegistryManager (50-197)
backend/app/events/core/producer.py (1)
  • producer (59-60)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: cubic · AI code reviewer
  • GitHub Check: Integration Tests
  • GitHub Check: E2E Tests
🔇 Additional comments (17)
docs/reference/environment-variables.md (1)

50-50: LGTM!

Documentation correctly reflects the updated default for SCHEMA_REGISTRY_AUTH from *none* to "", consistent with the settings change.

backend/app/settings.py (1)

54-54: LGTM!

The change from str | None = None to str = "" aligns with the async schema registry's auth parsing logic. An empty string correctly results in auth = None when split by :, maintaining backward compatibility.

backend/app/core/providers.py (2)

49-49: LGTM!

Import updated to use the class directly, consistent with the removal of the factory function from schema_registry.py.


193-195: LGTM!

Direct instantiation of SchemaRegistryManager is appropriate here. The constructor is synchronous, and async methods like initialize_schemas() are invoked later by consumers of this provider.

backend/tests/integration/dlq/test_dlq_manager.py (2)

7-12: LGTM!

Imports correctly updated to use AIOKafkaProducer and direct SchemaRegistryManager instantiation, aligning with the async migration.


27-29: LGTM!

Schema registry instantiation correctly uses the new constructor signature with test_settings and logger.

backend/app/events/schema/schema_registry.py (11)

19-47: LGTM!

Helper functions with lru_cache are well-designed for caching event class mappings. The single-level subclass iteration is documented with notes about extending for deeper hierarchies if needed.


70-77: LGTM!

The register_schema method correctly handles async registration, updates both directional caches, and logs the operation.


79-84: LGTM!

Cache-first lookup with fallback to registration is the correct pattern.


86-105: LGTM!

The method correctly handles cache lookup, async registry fetch, schema parsing, and bidirectional cache updates. Returning None for unmapped schemas is appropriate for the caller to handle.


107-123: LGTM with a note on datetime handling.

The serialization logic correctly handles the Confluent wire format. The timestamp conversion to microseconds (lines 120-121) is appropriate for Avro's timestamp-micros logical type.

Note: Only the timestamp field is explicitly converted. If other datetime fields exist in event subclasses, they may need similar handling depending on their Avro schema definitions.


125-151: LGTM!

Deserialization correctly validates wire format, extracts schema ID, resolves the event class, and intelligently restores the event_type field from the model's default when not present in the payload.


153-168: LGTM!

The synchronous deserialize_json method is appropriate for JSON data sources (MongoDB, DLQ) that don't require schema registry interaction. Error handling for missing or unknown event types is correct.


170-188: LGTM!

Compatibility mode validation is comprehensive, and the async client call is correct.


190-197: LGTM!

Schema initialization with FORWARD compatibility is appropriate for evolving event schemas. Sequential processing ensures deterministic startup behavior.


200-201: LGTM!

Convenience wrapper provides a clean module-level API for startup initialization.


6-7: LGTM!

Imports correctly use the async classes from python-schema-registry-client (v2.6.1). AsyncSchemaRegistryClient and schema (used on lines 72, 113) are properly imported, and AsyncAvroMessageSerializer is instantiated and used throughout the class (lines 64, 123, 142). The # type: ignore[attr-defined] on line 7 is appropriate—the serializers module lacks complete type stubs in this version.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 7 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/idempotency/middleware.py">

<violation number="1" location="backend/app/services/idempotency/middleware.py:238">
P1: Incomplete aiokafka migration: `message.value()` should be `message.value` (property access). The change on line 238 correctly updates `topic()` to `topic`, but line 221 still incorrectly calls `value()` as a method, which will fail at runtime since aiokafka's ConsumerRecord.value is a property.</violation>
</file>

<file name="backend/app/core/providers.py">

<violation number="1" location="backend/app/core/providers.py:195">
P1: Missing lifecycle management for SchemaRegistryManager's async client. The AsyncSchemaRegistryClient uses aiohttp ClientSession internally, which requires proper cleanup to avoid resource leaks and "unclosed client session" warnings. This provider method should return AsyncIterator[SchemaRegistryManager] and use an async context manager pattern like other async Kafka components in this file.</violation>
</file>

<file name="backend/app/events/schema/schema_registry.py">

<violation number="1" location="backend/app/events/schema/schema_registry.py:61">
P1: Missing validation for SCHEMA_REGISTRY_AUTH format. Invalid formats like "username" or "user:" are silently accepted, potentially causing authentication failures that are hard to debug.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

@sonarqubecloud
Copy link

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (7)
backend/app/events/core/consumer.py (1)

104-124: Consider using getmany() for more efficient batch processing.

The current implementation uses getone() with a short timeout, which works but may be less efficient under high load. The getmany() method can retrieve multiple messages in one call, reducing overhead.

Additionally, the except KafkaError block increments processing_errors but doesn't propagate or handle the error further—this is acceptable for resilience, but consider whether certain Kafka errors (e.g., authorization failures) should trigger a reconnection or shutdown.

backend/app/events/consumer_group_monitor.py (2)

87-104: Add defensive handling for missing keys in response parsing.

The _parse_describe_groups_response function directly accesses dictionary keys without checking for their existence. If the Kafka response structure changes or contains unexpected data, this could raise KeyError.

♻️ Suggested defensive parsing
 def _parse_describe_groups_response(response: Response) -> list[DescribedGroup]:
     """Parse DescribeGroupsResponse into typed DescribedGroup objects."""
     obj = response.to_object()
-    groups_data: list[dict[str, Any]] = obj["groups"]
+    groups_data: list[dict[str, Any]] = obj.get("groups", [])

     result: list[DescribedGroup] = []
     for g in groups_data:
         result.append(
             DescribedGroup(
-                error_code=g["error_code"],
-                group_id=g["group"],
-                state=g["state"],
-                protocol_type=g["protocol_type"],
-                protocol=g["protocol"],
-                members=g["members"],
+                error_code=g.get("error_code", -1),
+                group_id=g.get("group", ""),
+                state=g.get("state", "Unknown"),
+                protocol_type=g.get("protocol_type", ""),
+                protocol=g.get("protocol", ""),
+                members=g.get("members", []),
             )
         )
     return result

381-411: Resource leak risk if consumer start fails after creation.

The temporary consumer for lag monitoring is created and started, but if consumer.start() succeeds and then end_offsets() fails before reaching the finally block, the consumer will be properly stopped. However, if an exception occurs during consumer creation itself, there's no issue. The current pattern is correct.

One minor note: the unique group ID uses datetime.now().timestamp() which could theoretically collide in rapid succession. Consider using uuid.uuid4() for guaranteed uniqueness.

♻️ Use UUID for guaranteed unique group ID
+import uuid
+
 # Create a temporary consumer to get high watermarks
 consumer = AIOKafkaConsumer(
     bootstrap_servers=self._bootstrap_servers,
-    group_id=f"{group_id}-lag-monitor-{datetime.now().timestamp()}",
+    group_id=f"{group_id}-lag-monitor-{uuid.uuid4().hex[:8]}",
     enable_auto_commit=False,
     auto_offset_reset="earliest",
 )
backend/app/events/core/producer.py (1)

125-158: Consider catching broader exceptions in produce().

The except KafkaError block only catches Kafka-specific errors. However, the async schema registry serialization (serialize_event) could raise other exceptions (e.g., ValueError for schema issues). These would propagate unhandled, which may be intentional but worth noting.

Additionally, the metrics and error recording on lines 154-156 reconstruct the topic string instead of reusing the topic variable from line 129.

♻️ Reuse topic variable for consistency
         except KafkaError as e:
             self._metrics.messages_failed += 1
             self._metrics.last_error = str(e)
             self._metrics.last_error_time = datetime.now(timezone.utc)
             self._event_metrics.record_kafka_production_error(
-                topic=f"{self._topic_prefix}{str(event_to_produce.topic)}", error_type=type(e).__name__
+                topic=topic, error_type=type(e).__name__
             )
             self.logger.error(f"Failed to produce message: {e}")
             raise
backend/app/dlq/manager.py (2)

197-206: Consider catching ConsumerStoppedError for cleaner shutdown.

The polling uses getone() with asyncio.wait_for, which is correct for aiokafka. However, during shutdown, getone() may raise ConsumerStoppedError if the consumer is stopped while waiting. Currently this would be caught by KafkaError, but explicit handling would be cleaner.

♻️ Optional improvement
 from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
-from aiokafka.errors import KafkaError
+from aiokafka.errors import ConsumerStoppedError, KafkaError
 
 ...
 
     async def _poll_message(self) -> Any | None:
         """Poll for a message from Kafka using async getone()."""
         try:
             return await asyncio.wait_for(self.consumer.getone(), timeout=1.0)
         except asyncio.TimeoutError:
             return None
+        except ConsumerStoppedError:
+            return None
         except KafkaError as e:
             self.logger.error(f"Consumer error: {e}")
             return None

516-532: Consider adding request_timeout_ms for robustness.

The producer and consumer configurations look correct for aiokafka. However, consider adding explicit timeout configurations to prevent indefinite hangs during broker connectivity issues.

♻️ Optional timeout configuration
     consumer = AIOKafkaConsumer(
         topic_name,
         bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
         group_id=f"{GroupId.DLQ_MANAGER}.{settings.KAFKA_GROUP_SUFFIX}",
         enable_auto_commit=False,
         auto_offset_reset="earliest",
         client_id="dlq-manager-consumer",
+        request_timeout_ms=30000,
     )
     producer = AIOKafkaProducer(
         bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
         client_id="dlq-manager-producer",
         acks="all",
         compression_type="gzip",
         max_batch_size=16384,
         linger_ms=10,
         enable_idempotence=True,
+        request_timeout_ms=30000,
     )
backend/app/services/event_bus.py (1)

260-294: Short timeout may cause high CPU usage during idle periods.

The timeout=0.1 (100ms) in asyncio.wait_for means the loop iterates ~10 times per second even when idle. While functional, this is more aggressive than necessary for an event bus.

Consider increasing the timeout to reduce CPU overhead during idle periods, or using aiokafka's async iterator pattern.

♻️ Option 1: Increase timeout
-                    msg = await asyncio.wait_for(self.consumer.getone(), timeout=0.1)
+                    msg = await asyncio.wait_for(self.consumer.getone(), timeout=1.0)
♻️ Option 2: Use async iterator (cleaner pattern)
async def _kafka_listener(self) -> None:
    """Listen for Kafka messages from OTHER instances and distribute to local subscribers."""
    if not self.consumer:
        return

    self.logger.info("Kafka listener started")

    try:
        async for msg in self.consumer:
            if not self.is_running:
                break

            # Skip messages from this instance
            headers = dict(msg.headers) if msg.headers else {}
            source = headers.get("source_instance", b"").decode("utf-8")
            if source == self._instance_id:
                continue

            try:
                event_dict = json.loads(msg.value.decode("utf-8"))
                event = EventBusEvent.model_validate(event_dict)
                await self._distribute_event(event.event_type, event)
            except Exception as e:
                self.logger.error(f"Error processing Kafka message: {e}")

    except asyncio.CancelledError:
        self.logger.info("Kafka listener cancelled")
    except Exception as e:
        self.logger.error(f"Fatal error in Kafka listener: {e}")
📜 Review details

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3c82f9d and 74a5971.

📒 Files selected for processing (9)
  • backend/app/dlq/manager.py
  • backend/app/events/consumer_group_monitor.py
  • backend/app/events/core/consumer.py
  • backend/app/events/core/producer.py
  • backend/app/services/event_bus.py
  • backend/app/services/idempotency/middleware.py
  • backend/app/settings.py
  • backend/tests/integration/dlq/test_dlq_manager.py
  • docs/components/schema-manager.md
🚧 Files skipped from review as they are similar to previous changes (2)
  • backend/app/settings.py
  • backend/app/services/idempotency/middleware.py
🧰 Additional context used
🧬 Code graph analysis (4)
backend/app/events/core/consumer.py (1)
backend/app/events/schema/schema_registry.py (1)
  • deserialize_event (125-151)
backend/tests/integration/dlq/test_dlq_manager.py (4)
backend/app/dlq/manager.py (1)
  • create_dlq_manager (507-544)
backend/app/domain/enums/kafka.py (1)
  • KafkaTopic (7-53)
backend/app/events/schema/schema_registry.py (1)
  • SchemaRegistryManager (50-197)
backend/app/events/core/producer.py (1)
  • producer (55-56)
backend/app/events/consumer_group_monitor.py (2)
backend/app/core/utils.py (1)
  • StringEnum (6-31)
backend/app/events/core/consumer.py (3)
  • state (197-198)
  • start (45-70)
  • consumer (209-210)
backend/app/services/event_bus.py (2)
backend/app/events/core/producer.py (1)
  • producer (55-56)
backend/app/domain/enums/kafka.py (1)
  • KafkaTopic (7-53)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Scan Backend
  • GitHub Check: cubic · AI code reviewer
  • GitHub Check: E2E Tests
  • GitHub Check: Integration Tests
🔇 Additional comments (26)
backend/app/events/core/consumer.py (3)

50-64: LGTM! Clean AIOKafkaConsumer initialization.

The consumer configuration is properly mapped from the config object to aiokafka parameters. The async start pattern with topic subscription in the constructor is correct for aiokafka.


131-144: LGTM! Message processing correctly adapted to aiokafka.

The attribute access pattern (message.topic, message.value, etc.) correctly uses aiokafka's ConsumerRecord properties. The async deserialization call aligns with the schema registry changes.


255-256: seek() in aiokafka is synchronous—correct usage here.

In aiokafka 0.13.0, seek() is a synchronous method that doesn't require awaiting (unlike seek_to_beginning() and seek_to_end() which are async). The current usage at lines 255-256 is correct. Note that seeking before the consumer has fetched partition metadata may not work as expected—ensure the consumer has started and has assignments before calling seek().

backend/app/events/consumer_group_monitor.py (3)

27-36: LGTM! Well-defined ConsumerGroupState enum.

The enum correctly maps Kafka protocol states. Using StringEnum ensures proper string serialization for API responses.


164-178: LGTM! Proper lazy initialization pattern for admin client.

The lazy initialization with _get_admin() and explicit close() method is a clean pattern for managing the admin client lifecycle.


446-453: Potential division by zero if partition_distribution has all zero values.

If all members have zero assigned partitions, both max_partitions and min_partitions would be 0. The check max_partitions > 0 prevents division issues, but it's worth noting this edge case is handled correctly.

backend/tests/integration/dlq/test_dlq_manager.py (2)

46-55: LGTM! Correct async producer lifecycle in test.

The test properly manages the aiokafka producer with start(), send_and_wait(), and stop() within a try/finally block, ensuring cleanup even on failure.


28-29: LGTM! Direct SchemaRegistryManager instantiation.

The change from factory function to direct class instantiation aligns with the broader migration. The test correctly passes required dependencies.

docs/components/schema-manager.md (3)

19-19: LGTM! Documentation accurately reflects async API.

The documentation correctly states that all registry operations are async and must be awaited, aligning with the code changes.


23-23: LGTM! Clear async usage guidance.

The documentation clearly explains that serialize_event and deserialize_event must be awaited, and accurately describes the wire format and caching behavior.


29-29: LGTM! Startup sequence documentation updated.

The documentation correctly emphasizes that initialize_schemas() and apply_all() are async and must be awaited during startup.

backend/app/events/core/producer.py (3)

63-73: LGTM! Proper AIOKafkaProducer initialization.

The producer configuration is correctly mapped to aiokafka parameters. The max_batch_size parameter correctly replaces confluent-kafka's batch configuration.


131-132: LGTM! Correct header format for aiokafka.

The conversion of headers to a list of tuples with byte values [(k, v.encode()) for k, v in headers.items()] is the correct format for aiokafka.


214-223: LGTM! DLQ publishing correctly uses send_and_wait.

The DLQ publishing correctly uses send_and_wait for guaranteed delivery and properly formats headers as a list of tuples with byte values.

backend/app/dlq/manager.py (6)

7-8: LGTM!

Imports correctly updated from confluent-kafka to aiokafka types and errors.


31-53: LGTM!

Constructor and instance attributes correctly typed for aiokafka clients. The migration from confluent-kafka types to AIOKafkaConsumer and AIOKafkaProducer is clean.


120-148: LGTM!

Message parsing correctly adapted for aiokafka's ConsumerRecord:

  • Uses msg.value (property) instead of msg.value() (method)
  • Uses msg.offset and msg.partition properties directly
  • Header extraction handles aiokafka's list-of-tuples format correctly

150-177: LGTM!

Async lifecycle correctly implemented:

  • await self.producer.start() and await self.consumer.start() for initialization
  • await self.consumer.stop() and await self.producer.stop() for cleanup
  • Task cancellation properly handled with CancelledError catch

238-242: LGTM!

Commit correctly uses await self.consumer.commit() for aiokafka's async commit API.


322-341: LGTM!

Producer operations correctly use send_and_wait() for reliable delivery:

  • Headers formatted as list of (str, bytes) tuples as required by aiokafka
  • Both retry topic and original topic publishing use the same pattern
  • No need for explicit flush() since send_and_wait() awaits delivery confirmation
backend/app/services/event_bus.py (6)

10-11: LGTM!

Imports correctly updated for aiokafka.


41-68: LGTM!

Good design decisions:

  • Docstring clearly explains the distributed behavior and self-filtering pattern
  • _instance_id enables filtering out self-published messages
  • Type annotations correctly updated for aiokafka clients

76-97: Unique group_id per instance prevents shared consumption - verify this is intentional.

Using group_id=f"event-bus-{uuid4()}" means each instance gets its own consumer group, so every instance receives all messages (fan-out behavior). This appears intentional based on the docstring ("events from OTHER instances"), but differs from typical consumer group semantics.

If the intent is truly fan-out (all instances receive all events), this is correct. If load-balanced consumption is ever needed, this pattern would need adjustment.


99-123: LGTM!

Clean async shutdown:

  • Consumer task properly cancelled with CancelledError handling
  • Both consumer and producer stopped with await .stop()
  • Subscriptions cleared under lock

125-151: LGTM!

Publish implementation is solid:

  • source_instance header correctly encodes instance ID for filtering
  • send_and_wait() ensures delivery confirmation
  • Error handling logs failures without propagating (fire-and-forget semantic appropriate for event bus)

273-276: Header access may raise KeyError if key is missing - consider using .get() on the dict.

When converting headers to a dict, accessing a missing key with headers.get() is safe, but if the headers dict construction fails (e.g., duplicate keys), behavior could be unexpected. The current implementation looks safe, but worth noting that b"" is the fallback which correctly handles missing headers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants